package com.rivues.task;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicInteger;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.rivues.core.RivuDataContext;
import com.rivues.module.platform.web.model.JobDetail;
import com.rivues.task.resource.OutputTextFormat;
import com.rivues.task.resource.Resource;

public class Fetcher implements Runnable {
	
	private static Logger log = LoggerFactory.getLogger(CrawlTaskJob.class) ;
	private JobDetail job = null;

	private long start = System.currentTimeMillis(); // start time of fetcher
	private AtomicInteger activeThreads = new AtomicInteger(0);
	private AtomicInteger pages = new AtomicInteger(0); // total pages fetched
	private AtomicInteger errors = new AtomicInteger(0); // total pages fetched
	private Resource resource = null ;
	private int processpages = 0 ;
	/**
	 * 构建任务信息
	 * @param job
	 */
	public Fetcher(JobDetail job) throws Exception{
		this.job = job;
		try {
			if(job!=null && job.getTasktype()!=null){
				resource = Resource.getResource(job);
			}
			/**
			 * 初始化资源
			 */
			if(resource!=null){
				resource.begin();
			}
			this.job.setLastindex(job.getStartindex()) ;
			this.pages = new AtomicInteger((int)job.getReport().getPages()); // total pages fetched
			processpages = this.pages.intValue() ;
		}catch (Exception e1) {
			e1.printStackTrace();
			/**
			 * 设置错误代码
			 */
			throw e1;
		}
	}

	public void run() {
		AtomicInteger rspages = new AtomicInteger(0); // total pages fetched
		job.getReport().setThreads(1);
		job.getReport().setStarttime(new java.util.Date());
		try {
			synchronized (activeThreads) {
				activeThreads.incrementAndGet(); // count threads
			}
			reportStatus();
			OutputTextFormat obj;
			while (job.isFetcher() && resource != null && (obj = resource.next()) != null) {
				rspages.incrementAndGet();
				try {
					while (job.isPause() && job.isFetcher()) {
						Thread.sleep(1000);
					}
					if (obj!= null)
						output(obj);
				} catch (Throwable t) { // unexpected exception
					// unblock
					job.getReport().addError("0:" + t.getMessage());
					t.printStackTrace();
					errors.incrementAndGet();
				}
			}
			/**
	         * 
	         */
		} catch (Throwable e) {
			e.printStackTrace();
			job.setExceptionMsg(e.getMessage());
		} finally {
			if(resource!=null){
				/**
				 * end中包含了 Close 方法
				 */
				try {
					this.resource.end(this.pages.intValue()==processpages) ;
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
			job.setDocNum(job.getDocNum()+this.pages.get());
			synchronized (activeThreads) {
				activeThreads.decrementAndGet(); // count threads
			}
			if(job.isFetcher()){
				log.info("单轮任务执行完毕，任务名称："+job.getName()+"，采集数据量："+pages.intValue());
			}else{
				log.info("单轮任务执行被停止，任务名称："+job.getName()+"，已采集数据量："+pages.intValue());
			}
		}
	}
	
	private void output(OutputTextFormat object) {
		pages.incrementAndGet();
		try {
			this.reportStatus();
			OutputTextFormat outputTextFormat = resource.getText(object);
			if(outputTextFormat==null){
				return ;
			}else{
				resource.process(outputTextFormat, job) ;
				job.setStartindex(job.getStartindex()+1) ;
			}
			reportStatus();
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
	
	/**
	 * 
	 * @throws IOException
	 */
	private void reportStatus() throws IOException {
		long elapsed = (System.currentTimeMillis() - start) / 1000;
		if(this.job!=null && this.job.getReport()!=null){
			this.job.getReport().setPages(this.pages.intValue()) ;
			this.job.getReport().setThreads(activeThreads.intValue()) ;
			this.job.getReport().setErrors(errors.intValue()) ;
			if (elapsed != 0)
				job.getReport().setSpeed(
						(float) ((pages.get() * 10) / elapsed) / 10.0);
			this.job.getReport().setStatus(new StringBuffer().append("已处理：").append(this.job.getReport().getPages()).append(", 错误：").append(this.job.getReport().getErrors()).append("，处理速度：").append(job.getReport().getSpeed()).append("条/秒，线程数：").append(this.job.getReport().getThreads()).append(this.job.getReport().getDetailmsg()!=null ? "，详细信息："+this.job.getReport().getDetailmsg() : "").toString());
			RivuDataContext.getClusterInstance().get(RivuDataContext.DistributeEventEnum.RUNNINGJOBREPORT.toString()).put(job.getId(), job.getReport());
		}
	}
}